문서의 임의 삭제는 제재 대상으로, 문서를 삭제하려면 삭제 토론을 진행해야 합니다. 문서 보기문서 삭제토론 행위자 모델 (문단 편집) === Scala + Akka === {{{#!syntax java object Example { import akka.actor.{Actor, ActorRef, Props} object MapReduce { lazy val sys = akka.actor.ActorSystem() def apply[A, B, C](map: A => B, nrOfMapActors: Int, reduce: (C, B) => C, state: C, io: C => Unit) = sys.actorOf(Props( classOf[MapReduce[A, B, C]], map, nrOfMapActors, reduce, state, io)) } class MapReduce[A, B, C](map: A => B, nrOfMapActors: Int, reduce: (C, B) => C, state: C, io: C => Unit) extends Actor { import context.actorOf lazy val ioActor = actorOf(Props(classOf[IO[C]], io)) lazy val reduceActor = actorOf(Props(classOf[Reduce[B, C]], reduce, state, ioActor)) lazy val mapActor = if (nrOfMapActors < 2) actorOf(Props(classOf[Map.Single[A, B]], map, reduceActor)) else actorOf(Props(classOf[Map.Multiple[A, B]], map, nrOfMapActors, reduceActor)) def receive = { case elems: Seq[A] => for (elem <- elems) mapActor ! Message(elem) } } object Map { class Single[A, B](map: A => B, reduceActor: ActorRef) extends Actor { def receive = { case msg: Message[A] => reduceActor ! Message(map(msg.contents)) } } class Multiple[A, B](map: A => B, nrOfActors: Int, reduceActor: ActorRef) extends Actor { import akka.routing.{Router, RoundRobinRoutingLogic, ActorRefRoutee} import akka.actor.Terminated import context.{watch, actorOf} var router = Router( RoundRobinRoutingLogic(), for (_ <- 1 to nrOfActors) yield ActorRefRoutee(single) ) def single = watch(actorOf(Props(classOf[Single[A, B]], map, reduceActor))) def receive = { case msg: Message[A] => router route (msg, reduceActor) case Terminated(one) => router = (router removeRoutee one) addRoutee single } } } class Reduce[A, B](reduce: (B, A) => B, var state: B, ioActor: ActorRef) extends Actor { def receive = { case msg: Message[A] => state = reduce(state, msg.contents) ioActor ! Message(state) } } class IO[T](io: T => Unit) extends Actor { def receive = { case msg: Message[T] => io(msg.contents) } } case class Message[T](contents: T) } object Main extends App { type II = (Int, Int) val map: Seq[Int] => II = _.foldLeft (0, 0) { (acc, n) => (acc._1 + 1, acc._2 + (n & 1)) } val reduce: (II, II) => II = { case ((counted, odds), (c, o)) => (counted + c, odds + o) } val io: II => Unit = { case (counted, odds) => println(s"Counted: $counted Odds: $odds") } val nums = { val rng = new java.security.SecureRandom Stream.fill(10000000){rng.nextInt()}.grouped(100).toSeq } val mapReduce = Example.MapReduce(map, 8, reduce, (0, 0), io) mapReduce ! nums } }}}저장 버튼을 클릭하면 당신이 기여한 내용을 CC-BY-NC-SA 2.0 KR으로 배포하고,기여한 문서에 대한 하이퍼링크나 URL을 이용하여 저작자 표시를 하는 것으로 충분하다는 데 동의하는 것입니다.이 동의는 철회할 수 없습니다.캡챠저장미리보기